# Get data from vnstock

In [1]:
import pandas as pd
import time
import os  # <-- Thêm os để kiểm tra file
from tqdm import tqdm
from vnstock import Trading, Quote, Listing, Company
import concurrent.futures

# --- HẰNG SỐ CẤU HÌNH ---
CHUNK_SIZE_DETAIL = 30 # Số mã xử lý chi tiết (theo yêu cầu của bạn)
MAX_WORKERS = 1       # Số thread chạy song song
PAUSE_TIME_SUCCESS = 20 # (QUAN TRỌNG) Nghỉ 60 giây sau MỖI LÔ 30 mã (thành công)
PAUSE_TIME_CRASH = 45   # (QUAN TRỌNG) Nghỉ 45 giây khi bị CRASH (lỗi 41s)
OUTPUT_DIR = './stock-data/vnstock/vnstock_finally/'

# --- HÀM TIỆN ÍCH (Không đổi) ---

def create_chunks(data, chunk_size):
    """Chia một danh sách thành các danh sách con có kích thước chunk_size."""
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

def fetch_data(symbol):
    """
    Hàm worker cho mỗi thread, lấy overview và profile cho 1 mã.
    """
    overview_df = None
    profile_df = None
    try:
        company = Company(symbol=symbol, source='TCBS')
        
        # 1. Lấy overview
        try:
            overview_df = company.overview()
            if overview_df.empty: overview_df = None
        except Exception:
            pass # Lỗi overview, bỏ qua

        # 2. Lấy profile
        try:
            profile_df = company.profile()
            if not profile_df.empty:
                profile_df['symbol'] = symbol 
            else:
                profile_df = None
        except Exception:
            pass # Lỗi profile, bỏ qua
            
    except Exception:
        pass # Lỗi khởi tạo Company
        
    return (symbol, overview_df, profile_df)

def save_batch_to_csv(results_in_chunk, price_df_full, output_path, is_first_save):
    """
    Hàm xử lý kết quả của 1 lô và LƯU (APPEND) VÀO FILE
    'results_in_chunk' là list các tuple: [(symbol, ov_df, pr_df), ...]
    """
    
    # --- 1. TÁCH KẾT QUẢ ---
    # Lấy TẤT CẢ các mã đã được xử lý trong lô này (kể cả lỗi)
    all_symbols_in_batch = [symbol for symbol, _, _ in results_in_chunk]
    
    # Chỉ lấy các DataFrame thành công
    all_overviews = [ov_df for _, ov_df, _ in results_in_chunk if ov_df is not None]
    all_profiles = [pr_df for _, _, pr_df in results_in_chunk if pr_df is not None]
    
    if not all_symbols_in_batch:
        print("\nLô không có kết quả, bỏ qua lưu.")
        return

    # --- 2. CHUẨN BỊ MERGE ---
    # Lấy dữ liệu giá cho TẤT CẢ các mã trong lô
    price_batch_df = price_df_full[price_df_full['symbol'].isin(all_symbols_in_batch)].copy()
    
    # Concat các df thành công
    overview_batch_df = pd.concat(all_overviews, ignore_index=True) if all_overviews else pd.DataFrame()
    profile_batch_df = pd.concat(all_profiles, ignore_index=True) if all_profiles else pd.DataFrame()

    # --- 3. MERGE (ĐÃ SỬA LỖI) ---
    # Bắt đầu với df giá (đây là df chính của lô này)
    merged_df = price_batch_df

    # Chỉ merge overview NẾU overview_batch_df không rỗng
    if not overview_batch_df.empty:
        merged_df = pd.merge(
            merged_df,
            overview_batch_df,
            on='symbol',
            how='left',
            suffixes=('_price', '_overview')
        )
    
    # Chỉ merge profile NẾU profile_batch_df không rỗng
    if not profile_batch_df.empty:
        merged_df = pd.merge(
            merged_df,
            profile_batch_df,
            on='symbol',
            how='left',
            suffixes=('_overview', '_profile')
        )
    
    # --- 4. ĐIỀN "Not found" (THEO YÊU CẦU) ---
    price_cols = set(price_batch_df.columns)
    merged_cols = set(merged_df.columns)
    new_cols = list(merged_cols - price_cols)
    
    if new_cols: 
        merged_df[new_cols] = merged_df[new_cols].fillna("Not found")
    else:
        # Xử lý trường hợp 4 mã đều lỗi -> không có new_cols
        print("\nKhông có cột mới nào được thêm (tất cả mã trong lô này đều lỗi).")
        # 'merged_df' lúc này chỉ chứa dữ liệu giá của 4 mã lỗi.
    
    # --- 5. LƯU (APPEND) VÀO FILE ---
    # Dù lỗi, 4 mã này VẪN ĐƯỢC LƯU (chỉ với dữ liệu giá)
    merged_df.to_csv(
        output_path,
        mode='a', # 'a' = append (nối vào file)
        header=is_first_save, # Chỉ ghi header nếu là lần lưu đầu tiên
        index=False,
        encoding='utf-8-sig'
    )
    print(f"\nĐã lưu {len(merged_df)} mã trong lô vào file {output_path} (kể cả mã lỗi)")


# --- HÀM MAIN THỰC THI ---
def main():
    print("--- Bắt đầu script ETL ---")
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # --- 1. Lấy danh sách mã ---
    try:
        listing_obj = Listing()
        df_symbols = listing_obj.all_symbols()
        vnstock_list = df_symbols['symbol'].to_list()
        print(f"Tìm thấy {len(vnstock_list)} mã. Bắt đầu tải...")
    except Exception as e:
        print(f"Không thể lấy danh sách mã: {e}")
        vnstock_list = [] 

    # --- 2. Lấy bảng giá (ĐÃ SỬA LỖI RATE LIMIT) ---
    trading = Trading()

    data_df = trading.price_board(symbols_list=vnstock_list)['listing']

    # --- 2. Xác định file output dựa vào trading_date ---
    trading_date = data_df['trading_date'].iloc[0]
    output_path = os.path.join(OUTPUT_DIR, f'{trading_date}.csv')
    print(f"File output cho hôm nay là: {output_path}")

    # --- 3. VÒNG LẶP RETRY CHÍNH ---
    # Vòng lặp này sẽ chạy liên tục cho đến khi tất cả các mã được xử lý
    while True:
        # 3a. Xác định các mã CÒN LẠI cần xử lý
        full_ticker_list = data_df['symbol'].unique()
        processed_symbols = set()
        is_first_save = True

        if os.path.exists(output_path):
            try:
                print(f"Đang đọc file đã xử lý: {output_path}")
                processed_df = pd.read_csv(output_path)
                processed_symbols = set(processed_df['symbol'].unique())
                is_first_save = False # File đã tồn tại, không ghi header nữa
                print(f"Đã tìm thấy {len(processed_symbols)} mã đã xử lý.")
            except pd.errors.EmptyDataError:
                print("File output rỗng, sẽ ghi đè.")
                is_first_save = True
            except Exception as e:
                print(f"Lỗi đọc file CSV: {e}. Sẽ thử lại sau 10s...")
                time.sleep(10)
                continue # Quay lại đầu vòng lặp while True

        remaining_tickers = [t for t in full_ticker_list if t not in processed_symbols]
        
        if not remaining_tickers:
            print(f"--- DỮ LIỆU NGÀY {trading_date} ĐÃ HOÀN TẤT ---")
            break # Thoát khỏi vòng lặp while True

        print(f"Tổng: {len(full_ticker_list)} | Đã xử lý: {len(processed_symbols)} | Còn lại: {len(remaining_tickers)}")
        
        # 3b. Chia các mã CÒN LẠI thành các lô
        ticker_chunks = list(create_chunks(remaining_tickers, CHUNK_SIZE_DETAIL))
        
        # 3c. Xử lý các lô (và bắt lỗi crash)
        try:
            for chunk in tqdm(ticker_chunks, desc=f"Đang xử lý {len(remaining_tickers)} mã còn lại"):
                
                # Chạy 1 lô (30 mã) song song
                with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                    results_in_chunk = list(executor.map(fetch_data, chunk))
                
                # Lưu kết quả của lô này (mode='a')
                save_batch_to_csv(results_in_chunk, data_df, output_path, is_first_save)
                is_first_save = False # Đảm bảo không ghi header ở các lô sau

                # Nghỉ sau 1 lô THÀNH CÔNG
                print(f"--- Đã lưu lô. Tạm nghỉ {PAUSE_TIME_SUCCESS} giây (thành công)...")
                time.sleep(PAUSE_TIME_SUCCESS)

            # Nếu vòng lặp for chạy xong mà không crash, nghĩa là đã xong
            print("--- Vòng lặp for hoàn tất (không crash) ---")
            # Vòng lặp while True sẽ chạy lại, kiểm tra remaining_tickers (lúc này rỗng) và thoát
            
        except SystemExit as e:
            # BẮT LỖI CRASH (RATE LIMIT)
            print(f"\n--- !!! LỖI RATE LIMIT !!! ---")
            print(f"Lỗi: {e}")
            print(f"Script bị TCBS chặn. Sẽ tự động thử lại sau {PAUSE_TIME_CRASH} giây...")
            time.sleep(PAUSE_TIME_CRASH)
            # Vòng lặp while True sẽ tự động chạy lại từ bước 3a
            
        except Exception as e:
            # Bắt các lỗi bất ngờ khác (ví dụ: mất mạng)
            print(f"\n--- !!! LỖI BẤT NGỜ !!! ---")
            print(f"Lỗi: {e}. Sẽ thử lại sau {PAUSE_TIME_CRASH} giây...")
            time.sleep(PAUSE_TIME_CRASH)
            # Vòng lặp while True cũng sẽ tự động chạy lại

# --- Chạy script ---
if __name__ == "__main__":
    main()

--- Bắt đầu script ETL ---
Tìm thấy 1723 mã. Bắt đầu tải...
File output cho hôm nay là: ./stock-data/vnstock/vnstock_finally/2025-10-24.csv
Đang đọc file đã xử lý: ./stock-data/vnstock/vnstock_finally/2025-10-24.csv
Đã tìm thấy 1695 mã đã xử lý.
Tổng: 1699 | Đã xử lý: 1695 | Còn lại: 4


Đang xử lý 4 mã còn lại:   0%|          | 0/1 [00:00<?, ?it/s]


Không có cột mới nào được thêm (tất cả mã trong lô này đều lỗi).

Đã lưu 28 mã trong lô vào file ./stock-data/vnstock/vnstock_finally/2025-10-24.csv (kể cả mã lỗi)
--- Đã lưu lô. Tạm nghỉ 20 giây (thành công)...


Đang xử lý 4 mã còn lại: 100%|██████████| 1/1 [00:44<00:00, 44.27s/it]


--- Vòng lặp for hoàn tất (không crash) ---
Đang đọc file đã xử lý: ./stock-data/vnstock/vnstock_finally/2025-10-24.csv
Đã tìm thấy 1699 mã đã xử lý.
--- DỮ LIỆU NGÀY 2025-10-24 ĐÃ HOÀN TẤT ---


## MinIO S3

In [None]:
import pandas as pd
import time
import os
from tqdm import tqdm
from vnstock import Trading, Quote, Listing, Company
import concurrent.futures

# NEW: s3fs cho MinIO/S3
import s3fs

# --- HẰNG SỐ CẤU HÌNH ---
CHUNK_SIZE_DETAIL = 30  # Số mã xử lý chi tiết (theo yêu cầu của bạn)
MAX_WORKERS = 1         # Số thread chạy song song
PAUSE_TIME_SUCCESS = 20 # Nghỉ sau MỖI LÔ (thành công)
PAUSE_TIME_CRASH = 45   # Nghỉ khi bị crash/rate limit

# --- BẬT/TẮT LƯU S3 ---
USE_S3 = True

# --- THÔNG SỐ S3/MINIO ---
S3_ENDPOINT = os.getenv("AWS_S3_ENDPOINT", "http://host.docker.internal:9000/")
S3_KEY = os.getenv("AWS_ACCESS_KEY_ID", "admin")
S3_SECRET = os.getenv("AWS_SECRET_ACCESS_KEY", "password")

S3_BUCKET = "lakehouse"
S3_PREFIX = "raw/vnstock/vnstock_finally"  # không có slash đầu/cuối

# Nếu vẫn muốn lưu local khi USE_S3=False
OUTPUT_DIR = "./stock-data/vnstock/vnstock_finally/"

# Khởi tạo filesystem cho S3
fs = s3fs.S3FileSystem(
    key=S3_KEY,
    secret=S3_SECRET,
    client_kwargs={"endpoint_url": S3_ENDPOINT},
    anon=False,
)

def s3_path_for_trading_date(trading_date: str) -> str:
    """
    Trả về full s3:// path cho file output của một ngày giao dịch.
    """
    return f"s3://{S3_BUCKET}/{S3_PREFIX}/{trading_date}.csv"

def s3_exists(path: str) -> bool:
    return fs.exists(path)

def s3_read_csv(path: str) -> pd.DataFrame:
    return pd.read_csv(
        path,
        storage_options={"key": S3_KEY, "secret": S3_SECRET, "client_kwargs": {"endpoint_url": S3_ENDPOINT}},
    )

def s3_write_csv(df: pd.DataFrame, path: str, header: bool):
    """
    Ghi CSV lên S3. Nếu header=True, dùng wb (ghi mới).
    Nếu header=False, dùng ab (append).
    """
    mode = "wb" if header else "ab"
    with fs.open(path, mode) as f:
        df.to_csv(f, header=header, index=False, encoding="utf-8-sig")

def local_exists(path: str) -> bool:
    return os.path.exists(path)

def local_read_csv(path: str) -> pd.DataFrame:
    return pd.read_csv(path)

def local_write_csv(df: pd.DataFrame, path: str, header: bool):
    df.to_csv(
        path,
        mode=("w" if header else "a"),
        header=header,
        index=False,
        encoding="utf-8-sig",
    )

def create_chunks(data, chunk_size):
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

def fetch_data(symbol):
    overview_df = None
    profile_df = None
    try:
        company = Company(symbol=symbol, source='TCBS')
        try:
            overview_df = company.overview()
            if overview_df.empty:
                overview_df = None
        except Exception:
            pass
        try:
            profile_df = company.profile()
            if not profile_df.empty:
                profile_df['symbol'] = symbol
            else:
                profile_df = None
        except Exception:
            pass
    except Exception:
        pass
    return (symbol, overview_df, profile_df)

def save_batch_to_csv(results_in_chunk, price_df_full, output_path, is_first_save, use_s3: bool):
    # 1) TÁCH KẾT QUẢ
    all_symbols_in_batch = [symbol for symbol, _, _ in results_in_chunk]
    all_overviews = [ov_df for _, ov_df, _ in results_in_chunk if ov_df is not None]
    all_profiles = [pr_df for _, _, pr_df in results_in_chunk if pr_df is not None]

    if not all_symbols_in_batch:
        print("\nLô không có kết quả, bỏ qua lưu.")
        return

    # 2) CHUẨN BỊ MERGE
    price_batch_df = price_df_full[price_df_full['symbol'].isin(all_symbols_in_batch)].copy()
    overview_batch_df = pd.concat(all_overviews, ignore_index=True) if all_overviews else pd.DataFrame()
    profile_batch_df = pd.concat(all_profiles, ignore_index=True) if all_profiles else pd.DataFrame()

    # 3) MERGE
    merged_df = price_batch_df
    if not overview_batch_df.empty:
        merged_df = pd.merge(
            merged_df,
            overview_batch_df,
            on='symbol',
            how='left',
            suffixes=('_price', '_overview')
        )
    if not profile_batch_df.empty:
        merged_df = pd.merge(
            merged_df,
            profile_batch_df,
            on='symbol',
            how='left',
            suffixes=('_overview', '_profile')
        )

    # 4) ĐIỀN "Not found"
    price_cols = set(price_batch_df.columns)
    merged_cols = set(merged_df.columns)
    new_cols = list(merged_cols - price_cols)
    if new_cols:
        merged_df[new_cols] = merged_df[new_cols].fillna("Not found")
    else:
        print("\nKhông có cột mới nào được thêm (tất cả mã trong lô này đều lỗi).")

    # 5) LƯU (APPEND) VÀO FILE (S3 hoặc local)
    if use_s3:
        s3_write_csv(merged_df, output_path, header=is_first_save)
    else:
        # Đảm bảo thư mục local tồn tại
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        local_write_csv(merged_df, output_path, header=is_first_save)

    print(f"\nĐã lưu {len(merged_df)} mã trong lô vào {'S3' if use_s3 else 'file'} {output_path} (kể cả mã lỗi)")

def main():
    print("--- Bắt đầu script ETL ---")
    if not USE_S3:
        os.makedirs(OUTPUT_DIR, exist_ok=True)

    # 1) Lấy danh sách mã
    try:
        listing_obj = Listing()
        df_symbols = listing_obj.all_symbols()
        vnstock_list = df_symbols['symbol'].to_list()
        print(f"Tìm thấy {len(vnstock_list)} mã. Bắt đầu tải...")
    except Exception as e:
        print(f"Không thể lấy danh sách mã: {e}")
        vnstock_list = []

    # 2) Lấy bảng giá
    trading = Trading()
    data_df = trading.price_board(symbols_list=vnstock_list)['listing']

    # 2b) Xác định output path
    trading_date = data_df['trading_date'].iloc[0]
    if USE_S3:
        output_path = s3_path_for_trading_date(trading_date)
    else:
        output_path = os.path.join(OUTPUT_DIR, f'{trading_date}.csv')

    print(f"File output cho hôm nay là: {output_path}")

    # 3) Vòng lặp retry
    while True:
        full_ticker_list = data_df['symbol'].unique()
        processed_symbols = set()
        is_first_save = True

        # Đọc file đã xử lý (nếu có)
        try:
            if USE_S3:
                if s3_exists(output_path):
                    print(f"Đang đọc file đã xử lý (S3): {output_path}")
                    processed_df = s3_read_csv(output_path)
                    processed_symbols = set(processed_df['symbol'].unique())
                    is_first_save = False
                    print(f"Đã tìm thấy {len(processed_symbols)} mã đã xử lý.")
            else:
                if local_exists(output_path):
                    print(f"Đang đọc file đã xử lý (local): {output_path}")
                    processed_df = local_read_csv(output_path)
                    processed_symbols = set(processed_df['symbol'].unique())
                    is_first_save = False
                    print(f"Đã tìm thấy {len(processed_symbols)} mã đã xử lý.")
        except pd.errors.EmptyDataError:
            print("File output rỗng, sẽ ghi đè.")
            is_first_save = True
        except Exception as e:
            print(f"Lỗi đọc file CSV: {e}. Sẽ thử lại sau 10s...")
            time.sleep(10)
            continue

        remaining_tickers = [t for t in full_ticker_list if t not in processed_symbols]
        if not remaining_tickers:
            print(f"--- DỮ LIỆU NGÀY {trading_date} ĐÃ HOÀN TẤT ---")
            break

        print(f"Tổng: {len(full_ticker_list)} | Đã xử lý: {len(processed_symbols)} | Còn lại: {len(remaining_tickers)}")

        ticker_chunks = list(create_chunks(remaining_tickers, CHUNK_SIZE_DETAIL))

        try:
            for chunk in tqdm(ticker_chunks, desc=f"Đang xử lý {len(remaining_tickers)} mã còn lại"):
                with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                    results_in_chunk = list(executor.map(fetch_data, chunk))

                save_batch_to_csv(results_in_chunk, data_df, output_path, is_first_save, USE_S3)
                is_first_save = False

                print(f"--- Đã lưu lô. Tạm nghỉ {PAUSE_TIME_SUCCESS} giây (thành công)...")
                time.sleep(PAUSE_TIME_SUCCESS)

            print("--- Vòng lặp for hoàn tất (không crash) ---")

        except SystemExit as e:
            print(f"\n--- !!! LỖI RATE LIMIT !!! ---")
            print(f"Lỗi: {e}")
            print(f"Script bị TCBS chặn. Sẽ tự động thử lại sau {PAUSE_TIME_CRASH} giây...")
            time.sleep(PAUSE_TIME_CRASH)

        except Exception as e:
            print(f"\n--- !!! LỖI BẤT NGỜ !!! ---")
            print(f"Lỗi: {e}. Sẽ thử lại sau {PAUSE_TIME_CRASH} giây...")
            time.sleep(PAUSE_TIME_CRASH)

if __name__ == "__main__":
    main()


# VN

In [11]:
import pandas as pd

vn_path = r'stock-data\vnstock\vnstock_finally\2025-10-24.csv'
vn_df = pd.read_csv(vn_path, encoding='utf-8')
# vn_df['datadate'] = '2025-10-24'
# vn_df.to_csv(vn_path, index=False, encoding='utf-8-sig')

In [17]:
df_null_data = vn_df[vn_df['datadate'].isnull()]

In [16]:
df_null_data

Unnamed: 0,symbol,ceiling,floor,ref_price,stock_type,exchange_price,trading_status,trading_status_code,trading_status_group,security_status,...,industry_id,industry_id_v2,company_name,company_profile,history_dev,company_promise,business_risk,key_developments,business_strategies,datadate
11,XDC,87800.0,65000.0,76400.0,STOCK,UPCOM,,,,,...,181.0,2357.0,Tan Cang Construction Joint Stock Company,Tan Cang Construction Construction Company Lim...,"April 18, 1996: The predecessor of the compan...","The 2013 Land Law, revised to be completed i...",High competition in the industry. The compan...,"Construction and installation of waterway, tr...",Concentrating resources for business developm...,2025-10-24
48,VSP,1200.0,1000.0,1100.0,STOCK,UPCOM,,,,,...,191.0,2773.0,Viet Hai Shipping and Real Properties Corporation,Viet Hai Shipping and Real Properties Corpora...,The Company was established in 2002. 30/12/2...,The company provides maritime services to me...,The company was affected by global crisis du...,Producing and merchandising petrochemical pr...,"Investing in LPG truck groups, gas truck gro...",2025-10-24
96,VMI,1300.0,1100.0,1200.0,STOCK,UPCOM,,,,,...,181.0,2357.0,VISACO Mineral and Investement Joint Stock Com...,VISACO Mineral and Investement Joint Stock Com...,"VISACO, previously known as Vinashin constru...",Danang - Quangngai highway has been construc...,Vietnamese and global economy in general has...,Construct hot asphalt concrete; Implement co...,To set up management plans to employ and pro...,2025-10-24
109,VKD,133800.0,57400.0,95600.0,STOCK,UPCOM,,,,,...,216.0,3537.0,Khanh Hoa Mineral Water Joint Stock Company,Khanh Hoa Mineral Water Joint Stock Company (V...,"January 19, 1990: Formerly known as Dien Kha...",Danh Thanh - Vikoda brand is preferred by co...,"As the competition in the sector is harsh, t...",Producing mineral water and mineral water-re...,Researching to renew products' design and qu...,2025-10-24
115,VIS,10950.0,9540.0,10250.0,STOCK,HOSE,,,,,...,159.0,1757.0,Vietnam - Italy Steel Joint Stock Company,Viet Y Steel Joint Stock Company (VISCO) was e...,"December 26, 2003: Established from equitiza...","VIS steel brand has been confirmed, modern t...",The company is subjected to fierce competiti...,Producing and trading of steel products Prod...,Providing the market with construction steel...,2025-10-24
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1718,,,,,,,,,,,...,,,,,,,,,,2025-10-24
1719,,,,,,,,,,,...,,,,,,,,,,2025-10-24
1720,,,,,,,,,,,...,,,,,,,,,,2025-10-24
1721,,,,,,,,,,,...,,,,,,,,,,2025-10-24


In [3]:
vn_df.columns

Index(['symbol', 'ceiling', 'floor', 'ref_price', 'stock_type',
       'exchange_price', 'trading_status', 'trading_status_code',
       'trading_status_group', 'security_status', 'last_trading_date',
       'issue_date', 'listed_share', 'coupon_rate', 'yield', 'sending_time',
       'type', 'organ_name', 'mapping_symbol', 'product_grp_id', 'partition',
       'index_type', 'trading_date', 'lst_trading_status',
       'average_match_volume2_week', 'Unnamed: 25', 'id', 'prior_close_price',
       'benefit', 'exchange_overview', 'industry', 'company_type',
       'no_shareholders', 'foreign_percent', 'outstanding_share',
       'issue_share', 'established_year', 'no_employees', 'stock_rating',
       'delta_in_week', 'delta_in_month', 'delta_in_year', 'short_name',
       'website', 'industry_id', 'industry_id_v2', 'company_name',
       'company_profile', 'history_dev', 'company_promise', 'business_risk',
       'key_developments', 'business_strategies', 'datadate'],
      dtype='object

# US

In [1]:
import pandas as pd

us_path = r'stock-data\us\usstock_finally\2025-10-24.csv'
us_df = pd.read_csv(us_path, encoding='utf-8')
us_df['datadate'] = '2025-10-24'
# us_df.to_csv(us_path, index=False, encoding='utf-8-sig')

In [5]:
us_df.columns

Index(['symbol', 'company_name', 'sector', 'industry', 'website',
       'business_summary', 'full_time_employees', 'market_cap', 'country',
       'city', 'phone', 'previous_close', 'current_price', 'currency',
       'datadate'],
      dtype='object')

# JP

In [1]:
import pandas as pd

jp_path = r'stock-data\japan\jpstock_finally\2025-10-24.csv'
jp_df = pd.read_csv(jp_path, encoding='utf-8')
jp_df['datadate'] = '2025-10-24'
# jp_df.to_csv(jp_path, index=False, encoding='utf-8-sig')

In [2]:
jp_df

Unnamed: 0,ticker,company_name,company_name_jp,sector,industry,country,website,business_summary,employees,market_cap,currency,exchange,current_price,previous_close,method,datadate
0,9997,"Belluna Co., Ltd.",BELLUNA CO,Consumer Cyclical,Specialty Retail,Japan,https://www.belluna.co.jp,It operates through eight segments: Apparel an...,3884.0,9.747830e+10,JPY,JPX,1013.0,1017.0,yfinance,2025-10-24
1,9996,"Satoh & Co., Ltd.",SATOH & CO LTD,Consumer Defensive,Food Distribution,Japan,https://www.satoh-web.co.jp,"Satoh & Co., Ltd. sells commercial food produc...",696.0,1.665541e+10,JPY,JPX,1997.0,2015.0,yfinance,2025-10-24
2,9994,Yamaya Corporation,YAMAYA CORPORATION,Consumer Defensive,Beverages - Brewers,Japan,https://www.yamaya.jp,"Yamaya Corporation engages in the import, reta...",1792.0,2.518467e+10,JPY,JPX,2323.0,2333.0,yfinance,2025-10-24
3,9993,"Yamazawa Co., Ltd.",YAMAZAWA CO LTD,Consumer Defensive,Grocery Stores,Japan,https://www.yamazawa.co.jp,"Yamazawa Co., Ltd. engages in the retail busin...",1268.0,1.262412e+10,JPY,JPX,1170.0,1173.0,yfinance,2025-10-24
4,9991,Gecoss Corporation,GECOSS CORPORATION,Industrials,Rental & Leasing Services,Japan,https://www.gecoss.co.jp,Gecoss Corporation rents and sells constructio...,1379.0,4.804344e+10,JPY,JPX,1428.0,1410.0,yfinance,2025-10-24
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4408,1309,NEXT FUNDS ChinaAMC SSE50 Index Exchange Trade...,NOMURA ASSET MANAGEMENT CO LTD,,,Japan,,,,,JPY,JPX,,51430.0,yfinance,2025-10-24
4409,1308,Amova Exchange Traded Index Fund TOPIX,AMOVA ASSET MANAGEMENT CO LTD E,,,Japan,,,,,JPY,JPX,,3371.0,yfinance,2025-10-24
4410,1306,NEXT FUNDS TOPIX Exchange Traded Fund,NOMURA ASSET MANAGEMENT CO LTD,,,Japan,,,,,JPY,JPX,,3407.0,yfinance,2025-10-24
4411,1305,iFreeETF TOPIX (Yearly Dividend Type),DAIWA ASSET MANAGEMENT IFREEETF,,,Japan,,,,9.131478e+12,JPY,JPX,,3444.0,yfinance,2025-10-24


In [8]:
jp_df.columns

Index(['ticker', 'company_name', 'company_name_jp', 'sector', 'industry',
       'country', 'website', 'business_summary', 'employees', 'market_cap',
       'currency', 'exchange', 'current_price', 'previous_close', 'method',
       'datadate'],
      dtype='object')

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os
import sys
from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql import functions as F, types as T, DataFrame
from functools import reduce

load_dotenv()

NESSIE_URI = os.environ.get("NESSIE_URI") 
MINIO_ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID") 
MINIO_SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY") 
MINIO_ENDPOINT=os.environ.get("AWS_S3_ENDPOINT") 

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


spark = (
    SparkSession.builder
        .appName("Iceberg-Nessie-rest-catalog")
        .master("local[*]")
        # .config("spark.driver.host", "192.168.1.11")
        .config("spark.driver.host", "localhost")
        # .master("spark://host.docker.internal:7077")
        .config("spark.driver.memory", "2g")
        .config("spark.driver.cores", "1")
        .config("spark.executor.instances", "1")
        .config("spark.executor.cores", "2")
        .config("spark.executor.memory", "2g")
        .config("spark.jars.packages", ",".join([
            "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2",
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1",
            "org.apache.iceberg:iceberg-aws-bundle:1.8.1",
            ## Minio
            "org.apache.hadoop:hadoop-aws:3.3.4",
            "com.amazonaws:aws-java-sdk-bundle:1.12.262"
        ]))
        .config("spark.sql.extensions", "org.projectnessie.spark.extensions.NessieSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.nessie.type", "rest")
        .config("spark.sql.catalog.nessie.uri", NESSIE_URI)
        ## Minio config
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
        .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
        .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .getOrCreate()
)

# STG

In [4]:
vn_path = r'stock-data\vnstock\vnstock_finally\2025-10-24.csv'
df_vn_raw = (
  spark.read
  .option("header", True)
  .option("multiLine", True)         
  .option("escape", '"')            
  .option("quote", '"')
  .option("encoding", "UTF-8")
  .option("mode", "PERMISSIVE")    
  .option("inferSchema", "true")  
  # .schema(custom_schema)          
  .csv(vn_path)
)

us_path = r'stock-data\us\usstock_finally\2025-10-24.csv'
df_us_raw = (
  spark.read
  .option("header", True)
  .option("multiLine", True)         
  .option("escape", '"')            
  .option("quote", '"')
  .option("encoding", "UTF-8")
  .option("mode", "PERMISSIVE")    
  .option("inferSchema", "true")  
  # .schema(custom_schema)          
  .csv(us_path)
)

jp_path = r'stock-data\japan\jpstock_finally\2025-10-24.csv'
df_jp_raw = (
  spark.read
  .option("header", True)
  .option("multiLine", True)         
  .option("escape", '"')            
  .option("quote", '"')
  .option("encoding", "UTF-8")
  .option("mode", "PERMISSIVE")    
  .option("inferSchema", "true")  
  # .schema(custom_schema)          
  .csv(jp_path)
)

In [6]:
df_jp_raw.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- company_name_jp: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- country: string (nullable = true)
 |-- website: string (nullable = true)
 |-- business_summary: string (nullable = true)
 |-- employees: double (nullable = true)
 |-- market_cap: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- exchange: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- previous_close: double (nullable = true)
 |-- method: string (nullable = true)
 |-- datadate: date (nullable = true)



In [9]:
test_df = df_jp_raw.where(F.col('ticker')=='9996')
test_df.show(truncate=False)

+------+-----------------+---------------+------------------+-----------------+-------+---------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------------+--------+--------+-------------+--------------+--------+----------+
|ticker|company_name     |company_name_jp|sector            |industry         |country|website                    |business_summary                                                                                                                                                                                                                                

In [12]:
df_us_raw.show(truncate=False)

+------+------------------------------------------------------------+------------------+----------------------------+--------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+--------------+--------------+--------------+--------------+--------------+--------------+----------+
|symbol|company_name                                                |sector            |industry                    |website                                                 |business_

In [5]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col

def write_iceberg_dynamic_partition(spark: SparkSession, 
                                    df: DataFrame, 
                                    table_name: str, 
                                    partition_cols: list = ["country", "datadate"]):
    """
    Ghi một DataFrame vào bảng Iceberg với cơ chế ghi đè partition động.

    - Nếu bảng chưa tồn tại, tạo bảng mới và partition theo partition_cols.
    - Nếu bảng đã tồn tại, chỉ ghi đè (overwrite) các partition
      có trong DataFrame nguồn (df).

    Args:
        spark (SparkSession): Spark session đã kích hoạt Iceberg.
        df (DataFrame): DataFrame đầu vào (phải chứa 2 cột partition).
        table_name (str): Tên của bảng Iceberg (ví dụ: 'catalog.db.my_table').
        partition_cols (list): Danh sách các cột partition. 
                               Thứ tự rất quan trọng.
                               [ngoài, trong] -> ["country", "datadate"].
    """
    
    # 1. Kiểm tra xem các cột partition có tồn tại trong DataFrame không
    for p_col in partition_cols:
        if p_col not in df.columns:
            raise ValueError(f"Cột partition '{p_col}' không tìm thấy trong DataFrame.")

    print(f"Bắt đầu quá trình ghi vào bảng: {table_name}")

    try:
        # 2. Kiểm tra xem bảng đã tồn tại hay chưa
        if not spark.catalog.tableExists(table_name):
            
            # === TRƯỜNG HỢP 1: Bảng CHƯA TỒN TẠI ===
            # Cần tạo bảng lần đầu. 
            # Chúng ta phải chỉ định .partitionBy() khi tạo bảng.
            
            print(f"Bảng {table_name} chưa tồn tại. Đang tạo bảng mới...")
            
            df.write \
              .format("iceberg") \
              .mode("overwrite") \
              .partitionBy(*partition_cols) \
              .saveAsTable(table_name)
              
            print(f"Đã tạo bảng mới {table_name} và ghi dữ liệu thành công.")

        else:
            
            # === TRƯỜNG HỢP 2: Bảng ĐÃ TỒN TẠI ===
            # Sử dụng chế độ "dynamic partition overwrite".
            
            print(f"Bảng {table_name} đã tồn tại. Áp dụng ghi đè partition động...")

            # Cấu hình Spark để bật chế độ ghi đè partition động
            # (Ghi đè dựa trên dữ liệu, không phải toàn bộ bảng)
            spark.conf.set("spark.sql.iceberg.partition-overwrite-mode", "dynamic")
            
            # Khi ghi, không cần .partitionBy() nữa vì bảng đã tồn tại
            # và đã có siêu dữ liệu (metadata) về partition.
            
            df.writeTo(table_name).overwritePartitions()
              
            
            print(f"Đã ghi đè partition động vào bảng {table_name} thành công.")

    except Exception as e:
        print(f"Đã xảy ra lỗi khi ghi vào bảng {table_name}: {e}")
        # raise e # Tùy chọn: ném lỗi ra ngoài nếu muốn dừng hẳn pipeline

# ===================================================================
# === VÍ DỤ CÁCH SỬ DỤNG ===
# ===================================================================

# (Giả sử bạn đã có một SparkSession tên 'spark' được cấu hình với Iceberg)

# --- Tạo dữ liệu mẫu lần 1 (Partition cho US & VN ngày 2025-10-31) ---
data1 = [
    ("JP", "2025-10-31", 101, "user_b"),
    ("VN", "2025-10-31", 200, "user_c"),
    ("VN", "2025-10-31", 202183612921210, "usejbdjkwbdjksdr_c")
]
schema = ["country", "datadate", "user_id", "user_name"]
df1 = spark.createDataFrame(data1, schema)

# Tên bảng
my_table = "nessie.stg.test" 

# --- GỌI HÀM LẦN 1 (Sẽ tạo bảng) ---
write_iceberg_dynamic_partition(spark, df1, my_table)


Bắt đầu quá trình ghi vào bảng: nessie.stg.test
Bảng nessie.stg.test đã tồn tại. Áp dụng ghi đè partition động...
Đã ghi đè partition động vào bảng nessie.stg.test thành công.


In [22]:

def norm_sym(c): return F.upper(F.trim(F.col(c)))
def as_double(c): return F.col(c).cast(T.DoubleType())
def as_long(c):   return F.col(c).cast(T.LongType())
def as_long_clean(c):
    return F.regexp_replace(F.col(c).cast("string"), r"[^0-9\.\-]", "").cast(T.LongType())
def as_double_clean(c):
    return F.regexp_replace(F.col(c).cast("string"), r"[^0-9\.\-]", "").cast(T.DoubleType())

# VN
stg_vn = (
  df_vn_raw
  .withColumn("country", F.lit("VN"))
  .withColumn("symbol", F.upper(F.trim(F.col("symbol"))))
  .withColumn("datadate", F.to_date("datadate"))
  .withColumn("company_name", F.col("company_name"))
  .withColumn("sector", F.col("industry"))
  .withColumn("industry", F.col("industry"))
  .withColumn("website", F.col("website"))
  .withColumn("employees", F.col("no_employees").cast("long"))
  # Làm sạch số
  .withColumn("ref_price_clean",      as_double_clean("ref_price"))
  .withColumn("prior_close_clean",    as_double_clean("prior_close_price"))
  .withColumn("ceiling",              as_double_clean("ceiling"))
  .withColumn("floor",                as_double_clean("floor"))
  .withColumn("foreign_percent",      as_double_clean("foreign_percent"))
  .withColumn("delta_in_week",        as_double_clean("delta_in_week"))
  .withColumn("delta_in_month",       as_double_clean("delta_in_month"))
  .withColumn("delta_in_year",        as_double_clean("delta_in_year"))
  .withColumn("avg_match_vol_2w",     F.col("average_match_volume2_week").cast("long"))
  .withColumn("outstanding_share",    F.col("outstanding_share")*F.lit(1_000_000).cast("long"))
  .withColumn("issue_share",          F.col("issue_share").cast("long"))
  # Exchange: lấy từ overview, nếu null thì lấy exchange_price (đang là mã sàn)
  .withColumn("exchange",
      F.coalesce(F.col("exchange_overview"), F.col("exchange_price")).cast("string")
  )
  # current_price fallback: không có giá khớp trong dataset này
  .withColumn("current_price",
      F.coalesce(F.col("ref_price_clean"), F.col("prior_close_clean"))
  )
  .withColumn("price_source",
      F.when(F.col("ref_price_clean").isNotNull(),   F.lit("ref_price"))
       .when(F.col("prior_close_clean").isNotNull(), F.lit("previous_close"))
       .otherwise(F.lit(None).cast("string"))
  )
  # previous_close
  .withColumn("previous_close", F.col("prior_close_clean"))
  # market_cap (tỷ VND)
  .withColumn(
      "market_cap",
      F.when(
          F.col("outstanding_share").isNotNull() & F.col("current_price").isNotNull(),
          (F.col("outstanding_share") * F.col("current_price"))
      )
  )
  # Các trường VN khác giữ nguyên/cast
  .withColumn("currency", F.lit("VND"))
  .withColumn("trading_status", F.col("trading_status"))
  .withColumn("trading_status_code", F.col("trading_status_code"))
  .withColumn("trading_status_group", F.col("trading_status_group"))
  .select(
    "symbol","country","datadate","company_name","sector","industry","website",
    "employees","market_cap","currency","exchange","current_price","previous_close",
    "ceiling","floor","ref_price_clean","trading_status","trading_status_code","trading_status_group",
    "foreign_percent","outstanding_share","issue_share","delta_in_week","delta_in_month","delta_in_year",
    "avg_match_vol_2w","price_source"
  )
  .withColumnRenamed("ref_price_clean","ref_price")
  .where(F.col("symbol").isNotNull() & (F.col("symbol") != ""))
)

# US
stg_us = (
  df_us_raw
  .withColumn("country", F.lit("US"))
  .withColumn("symbol", norm_sym("symbol"))
  .withColumn("datadate", F.to_date("datadate"))
  .withColumn("company_name", F.col("company_name"))
  .withColumn("sector", F.col("sector"))
  .withColumn("industry", F.col("industry"))
  .withColumn("website", F.col("website"))
  .withColumn("employees", as_long_clean("full_time_employees"))
  .withColumn("market_cap", as_double_clean("market_cap"))
  .withColumn("currency", F.col("currency"))
  .withColumn("exchange", F.lit(None).cast("string"))
  .withColumn("current_price", as_double_clean("current_price"))
  .withColumn("previous_close", as_double_clean("previous_close"))
  .select("symbol","country","datadate","company_name","sector","industry","website",
          "employees","market_cap","currency","exchange","current_price","previous_close")
  .where(F.col("symbol").isNotNull() & (F.col("symbol") != ""))
)

# JP
stg_jp = (
  df_jp_raw
  .withColumn("country", F.lit("JP"))
  .withColumn("symbol", norm_sym("ticker"))
  .withColumn("datadate", F.to_date("datadate"))
  .withColumn("company_name", F.col("company_name"))
  .withColumn("company_name_jp", F.col("company_name_jp"))
  .withColumn("sector", F.col("sector"))
  .withColumn("industry", F.col("industry"))
  .withColumn("website", F.col("website"))
  .withColumn("employees", as_long_clean("employees"))
  .withColumn("market_cap", as_double_clean("market_cap"))
  .withColumn("currency", F.col("currency"))
  .withColumn("exchange", F.col("exchange"))
  .withColumn("current_price", as_double_clean("current_price"))
  .withColumn("previous_close", as_double_clean("previous_close"))
  .select("symbol","country","datadate","company_name","company_name_jp","sector","industry","website",
          "employees","market_cap","currency","exchange","current_price","previous_close")
  .where(F.col("symbol").isNotNull() & (F.col("symbol") != ""))
)

TARGET_COLS = [
  "symbol","country","datadate","company_name","company_name_jp","sector","industry","website",
  "employees","market_cap","currency","exchange","current_price","previous_close",
  "ceiling","floor","ref_price","trading_status","trading_status_code","trading_status_group",
  "foreign_percent","outstanding_share","issue_share","delta_in_week","delta_in_month","delta_in_year",
  "avg_match_vol_2w"
]
TARGET_TYPES = {
  "symbol":"string","country":"string","datadate":"date","company_name":"string","company_name_jp":"string",
  "sector":"string","industry":"string","website":"string","employees":"long","market_cap":"double",
  "currency":"string","exchange":"string","current_price":"double","previous_close":"double",
  "ceiling":"double","floor":"double","ref_price":"double","trading_status":"string",
  "trading_status_code":"string","trading_status_group":"string","foreign_percent":"double",
  "outstanding_share":"long","issue_share":"long","delta_in_week":"double","delta_in_month":"double",
  "delta_in_year":"double","avg_match_vol_2w":"long"
}

def align(df):
    for c in TARGET_COLS:
        if c not in df.columns:
            df = df.withColumn(c, F.lit(None).cast(TARGET_TYPES[c]))
        else:
            df = df.withColumn(c, F.col(c).cast(TARGET_TYPES[c]))
    return df.select(*TARGET_COLS)

stg_union: DataFrame  = reduce(
    lambda a,b: a.unionByName(b, allowMissingColumns=True),
    [align(stg_vn), align(stg_us), align(stg_jp)]
)

In [21]:
stg_vn.show(truncate=False)

+------+-------+----------+-----------------------------------------------------------+---------------------------+---------------------------+-----------------------------------+---------+----------+--------+--------+-------------+--------------+--------+--------+---------+----------------------------------+-------------------+--------------------+---------------+-----------------+-----------+-------------+--------------+-------------+----------------+------------+
|symbol|country|datadate  |company_name                                               |sector                     |industry                   |website                            |employees|market_cap|currency|exchange|current_price|previous_close|ceiling |floor   |ref_price|trading_status                    |trading_status_code|trading_status_group|foreign_percent|outstanding_share|issue_share|delta_in_week|delta_in_month|delta_in_year|avg_match_vol_2w|price_source|
+------+-------+----------+-------------------------------

In [47]:
spark.sql("SHOW DATABASES IN nessie").show()

+---------+
|namespace|
+---------+
|     demo|
|      stg|
+---------+



In [23]:
table_name = f"nessie.stg.stg_stocks"
partition_key = "datadate"

if not spark.catalog.tableExists(table_name):
    print("Bảng chưa tồn tại. Đang tạo bảng mới (saveAsTable)...")
    (
        stg_union.writeTo(table_name)
                 .partitionedBy(partition_key)
                 .createOrReplace()
                 
    )
else:
    print("Bảng đã tồn tại. Ghi đè partition (insertInto)...")
    # Yêu cầu "có rồi thì overwritePartition"
    (
        stg_union.writeTo(table_name)
                 .partitionedBy(partition_key)
                 .overwritePartitions()
                 
    )

Bảng đã tồn tại. Ghi đè partition (insertInto)...


# Curated

In [24]:
stg_union = spark.table("nessie.stg.stg_stocks")

### dim_date

In [None]:
# Lấy ngày từ STG
dates = (stg_union.select(F.col("datadate").alias("date"))
         .where(F.col("date").isNotNull())
         .distinct())

dim_date = (dates
  .withColumn("date_sk", F.date_format("date", "yyyyMMdd").cast("int"))
  .withColumn("year", F.year("date"))
  .withColumn("quarter", F.quarter("date"))
  .withColumn("month", F.month("date"))
  .withColumn("day", F.dayofmonth("date"))
  .withColumn("week", F.weekofyear("date"))
  .withColumn("dow", F.dayofweek("date"))
  .withColumn("is_weekend", F.col("dow").isin(1,7))
)

dim_date.writeTo("nessie.curated.dim_date").createOrReplace()

### dim_exchange

In [26]:
dim_exch = (stg_union
  .select(F.upper(F.col("exchange")).alias("exchange_code"),
          F.col("country"))
  .where(F.col("exchange_code").isNotNull())
  .distinct()
  .withColumn("exchange_sk",
      F.crc32(F.concat_ws(":", F.col("exchange_code"), F.col("country"))).cast("bigint"))
)

dim_exch.writeTo("nessie.curated.dim_exchange").createOrReplace()

### dim_currency

In [27]:
import pycountry

# --- 1. TẠO BẢNG TRA CỨU TÊN TIỀN TỆ (Thay thế UDF) ---
# Cách này hiệu quả hơn nhiều so với UDF
print("Đang tạo bảng tra cứu tên tiền tệ (df_names_lookup)...")
currency_name_data = []
# Lấy tất cả tiền tệ từ thư viện pycountry
for currency in pycountry.currencies:
    currency_name_data.append((currency.alpha_3, currency.name))

# Thêm 'VND' (vì pycountry không có)
currency_name_data.append(("VND", "Vietnamese Dong"))

df_names_lookup = spark.createDataFrame(
    currency_name_data, 
    ["currency_code", "currency_name"]
)

# --- 2. TẠO BẢNG TRA CỨU TỶ GIÁ (Từ cấu trúc JSON) ---
print("Đang tạo bảng tra cứu tỷ giá (df_rates_lookup)...")

# Khai báo tỷ giá bằng cấu trúc Python dict (giống JSON)
rates_json_style = {
    "VND": 1,
    "USD": 25000,
    "JPY": 170
    # Bạn có thể thêm các đồng tiền khác vào đây
}

# Chuyển đổi dict sang danh sách (list) các tuple để Spark đọc
rate_data_list = list(rates_json_style.items())

# Tạo DataFrame tra cứu tỷ giá
df_rates_lookup = spark.createDataFrame(
    rate_data_list, 
    ["currency_code", "exchange_rate_to_vnd"]
)

print("Done")

# 3.1. Lấy danh sách currency_code duy nhất từ nguồn
dim_cur_base = (stg_union
    .select(F.upper(F.col("currency")).alias("currency_code")) # UPPER-case để chuẩn hoá
    .where(F.col("currency_code").isNotNull())
    .distinct()
)

# 3.2. Join với Bảng Tra Cứu Tên
# Dùng F.broadcast() vì bảng lookup rất nhỏ, giúp tăng tốc độ join
dim_cur_with_name = dim_cur_base.join(
    F.broadcast(df_names_lookup),
    "currency_code",
    "left"  # Giữ lại code dù không tìm thấy tên
)

# 3.3. Join với Bảng Tra Cứu Tỷ Giá
dim_cur_with_rate = dim_cur_with_name.join(
    F.broadcast(df_rates_lookup),
    "currency_code",
    "left"  # Giữ lại code dù không tìm thấy tỷ giá
)

# 3.4. Thêm Surrogate Key (SK) và sắp xếp cột
dim_currency = (dim_cur_with_rate
    .withColumn("currency_sk", F.crc32(F.col("currency_code")).cast("bigint"))
    # Sắp xếp lại các cột theo đúng thứ tự bạn muốn
    .select(
        "currency_sk",
        "currency_code",
        "currency_name",
        "exchange_rate_to_vnd"
    )
)

dim_currency.writeTo("nessie.curated.dim_currency").createOrReplace()


Đang tạo bảng tra cứu tên tiền tệ (df_names_lookup)...
Đang tạo bảng tra cứu tỷ giá (df_rates_lookup)...
Done


### dim_trading_status

In [None]:
dim_ts = (stg_union
  .where(F.col("country")=="VN")
  .select(F.col("trading_status_code").alias("status_code"),
          F.col("trading_status_group").alias("status_group"),
          F.col("trading_status").alias("trading_status"))
  .where(F.col("status_code").isNotNull() | F.col("status_group").isNotNull())
  .distinct()
  .withColumn("trading_status_sk",
      F.crc32(F.concat_ws(":", F.coalesce("status_code", F.lit("")),
                              F.coalesce("status_group", F.lit("")))).cast("bigint"))
)

dim_ts.writeTo("nessie.curated.dim_trading_status").createOrReplace()


### dim_company

In [29]:
from pyspark.sql import Window as W

comp_cols = ["company_name","company_name_jp","sector","industry","website","employees","exchange"]
comp = (stg_union
  .select("symbol","country","datadate", *comp_cols)
  .where(F.col("symbol").isNotNull() & F.col("datadate").isNotNull())
  .withColumn("attr_fingerprint", F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in comp_cols]), 256))
  .dropDuplicates(["symbol","country","datadate","attr_fingerprint"])
)

w = W.partitionBy("symbol","country").orderBy("datadate")
scd2 = (comp
  .withColumn("prev_fp", F.lag("attr_fingerprint").over(w))
  .withColumn("chg", F.when(F.col("prev_fp").isNull() | (F.col("prev_fp")!=F.col("attr_fingerprint")), F.lit(1)).otherwise(F.lit(0)))
  .withColumn("grp", F.sum("chg").over(w))  # nhóm phiên bản
)

# Tính effective_from cho từng phiên bản (grp), rồi dùng LEAD để lấy next_effective_from
rng_base = (scd2
  .groupBy("symbol","country","grp")
  .agg(F.min("datadate").alias("effective_from"))
)

w_ver = W.partitionBy("symbol","country").orderBy("effective_from")

rng = (rng_base
  .withColumn("next_effective_from", F.lead("effective_from").over(w_ver))
  # Rule bạn yêu cầu:
  # - Nếu có phiên bản kế tiếp tại ngày D_next, bản ghi hiện tại đóng vào đúng ngày D_next
  # - Nếu không có phiên bản kế tiếp -> đang hiệu lực, set 9999-12-31
  .withColumn(
      "effective_to",
      F.coalesce(F.col("next_effective_from"), F.to_date(F.lit("9999-12-31")))
  )
  .withColumn("is_current", F.col("next_effective_from").isNull())
  .drop("next_effective_from")
)

dim_company = (scd2
  .join(rng, ["symbol","country","grp"], "inner")
  .drop("prev_fp","chg")
  .withColumn("company_sk",
      F.crc32(F.concat_ws(":", "symbol","country", F.col("effective_from").cast("string"))).cast("bigint"))
  .withColumnRenamed("exchange","exchange_code")
  .select("company_sk","symbol","country","exchange_code","company_name","company_name_jp",
          "sector","industry","website","employees","effective_from","effective_to","is_current")
)

dim_company.writeTo("nessie.curated.dim_company").createOrReplace()


## fact_stock_daily

In [30]:
from pyspark.sql import functions as F, Window as W

# ---- Load DIMs
date_df = spark.table("nessie.curated.dim_date").select("date_sk","date")
exch_df = spark.table("nessie.curated.dim_exchange") \
               .select("exchange_sk", F.upper(F.col("exchange_code")).alias("ex_code"), F.col("country").alias("ex_country"))
cur_df  = spark.table("nessie.curated.dim_currency").select("currency_sk","currency_code")
ts_df   = spark.table("nessie.curated.dim_trading_status") \
               .select("trading_status_sk", "status_code", "status_group")
comp_all = spark.table("nessie.curated.dim_company") \
                .select("company_sk","symbol","country","effective_from","effective_to")

# ---- Base từ STG: gom cả cột price + snapshot
base = (stg_union
  .select("symbol","country","datadate","exchange","currency",
          "current_price","previous_close","ref_price","ceiling","floor",
          "delta_in_week","delta_in_month","delta_in_year","avg_match_vol_2w",
          "trading_status_code","trading_status_group",
          "market_cap","employees","outstanding_share","issue_share","foreign_percent")
  .where(F.col("symbol").isNotNull() & F.col("datadate").isNotNull())
  .withColumn("date_sk", F.date_format("datadate","yyyyMMdd").cast("int"))
  .withColumn("pct_change",
      F.when(F.col("previous_close").isNotNull() & (F.col("previous_close")!=0),
             (F.col("current_price")-F.col("previous_close"))/F.col("previous_close")))
  .withColumn("is_limit_up",   F.col("ceiling").isNotNull() & (F.col("current_price")>=F.col("ceiling")))
  .withColumn("is_limit_down", F.col("floor").isNotNull()   & (F.col("current_price")<=F.col("floor")))
)

# ---- Map DIM keys
# date
fact0 = base.join(date_df, "date_sk", "left")

# exchange
fact1 = fact0.join(
    exch_df,
    (F.upper(fact0["exchange"])==exch_df["ex_code"]) & (fact0["country"]==exch_df["ex_country"]),
    "left"
)

# currency
fact2 = fact1.join(cur_df, fact1["currency"]==cur_df["currency_code"], "left")

# trading status (VN)
fact3 = fact2.join(
    ts_df,
    (fact2["trading_status_code"]==ts_df["status_code"]) & (fact2["trading_status_group"]==ts_df["status_group"]),
    "left"
)

# company PIT join: datadate ∈ [effective_from, effective_to]
fact4 = fact3.alias("f").join(
    comp_all.alias("d"),
    (F.col("f.symbol")==F.col("d.symbol")) &
    (F.col("f.country")==F.col("d.country")) &
    (F.col("f.datadate")>=F.col("d.effective_from")) &
    (F.col("f.datadate")<=F.col("d.effective_to")),
    "left"
)

fact_equity_daily = fact4.select(
    F.col("f.date_sk"),
    F.col("d.company_sk"),
    F.col("exchange_sk"),
    F.col("currency_sk"),
    F.col("trading_status_sk"),
    F.col("f.current_price"), F.col("f.previous_close"), F.col("f.ref_price"),
    F.col("f.ceiling"), F.col("f.floor"), F.col("f.pct_change"),
    F.col("f.delta_in_week"), F.col("f.delta_in_month"), F.col("f.delta_in_year"),
    F.col("f.avg_match_vol_2w"), F.col("f.is_limit_up"), F.col("f.is_limit_down"),
    # snapshot attributes
    F.col("f.market_cap"), F.col("f.employees"), F.col("f.outstanding_share"),
    F.col("f.issue_share"), F.col("f.foreign_percent")
)

table_name = f"nessie.curated.fact_stock_daily"
partition_key = "date_sk"

if not spark.catalog.tableExists(table_name):
    print("Bảng chưa tồn tại. Đang tạo bảng mới (saveAsTable)...")
    (
        fact_equity_daily.writeTo(table_name)
                 .partitionedBy(partition_key)
                 .createOrReplace()
                 
    )
else:
    print("Bảng đã tồn tại. Ghi đè partition (insertInto)...")
    # Yêu cầu "có rồi thì overwritePartition"
    (
        fact_equity_daily.writeTo(table_name)
                 .partitionedBy(partition_key)
                 .overwritePartitions()
                 
    )


Bảng đã tồn tại. Ghi đè partition (insertInto)...
